home *** CD-ROM | disk | FTP | other *** search
- /*
- ** termMsgQueue.c
- **
- ** Special MsgPort like queue management
- **
- ** Copyright © 1990-1995 by Olaf `Olsen' Barthel
- ** All Rights Reserved
- */
-
- #include "termGlobal.h"
-
- /* DestroyPooled(struct MsgItem *Item):
- *
- * Default destructor for CreateMsgItem().
- */
-
- STATIC VOID __stdargs
- DestroyPooled(struct MsgItem *Item)
- {
- FreeVecPooled(Item);
- }
-
- /* GetMsgItem(struct MsgQueue *Queue):
- *
- * Get the next message from the queue handle,
- * similar to GetMsg().
- */
-
- APTR __regargs
- GetMsgItem(struct MsgQueue *Queue)
- {
- // Valid handle?
-
- if(Queue)
- {
- APTR Item;
-
- // Gain access
-
- ObtainSemaphore(&Queue -> Access);
-
- // Any item available?
-
- if(Queue -> MsgList . mlh_Head -> mln_Succ)
- {
- // Remove the first item
-
- Remove((struct Node *)(Item = Queue -> MsgList . mlh_Head));
-
- // Are there any tasks waiting for the queue to become smaller?
-
- if(Queue -> WaitList . mlh_Head -> mln_Succ)
- {
- struct SemaphoreRequest *WaitRequest = (struct SemaphoreRequest *)Queue -> WaitList . mlh_Head;
-
- // Remove one waiting task from the list
-
- Remove((struct Node *)WaitRequest);
-
- // Wake the task up
-
- Signal(WaitRequest -> sr_Waiter,SIGF_SINGLE);
- }
-
- // One message taken
-
- if(Queue -> QueueSize)
- Queue -> QueueSize--;
- }
- else
- Item = NULL;
-
- // Drop the semaphore
-
- ReleaseSemaphore(&Queue -> Access);
-
- // Return the item read
-
- return(Item);
- }
- else
- return(NULL);
- }
-
- /* PutMsgItem(struct MsgQueue *Queue,struct MsgItem *Item):
- *
- * Add a message item to a queue handle, similar to
- * PutMsg().
- */
-
- VOID __regargs
- PutMsgItem(struct MsgQueue *Queue,struct MsgItem *Item)
- {
- // Valid data?
-
- if(Queue && Item)
- {
- // Gain access to the handle
-
- ObtainSemaphore(&Queue -> Access);
-
- // Are we to discard this message?
-
- if(Queue -> Discard)
- {
- ReleaseSemaphore(&Queue -> Access);
-
- DeleteMsgItem(Item);
- }
- else
- {
- // Are we to watch for a maximum queue size?
-
- if(Queue -> MaxSize)
- {
- // Maximum queue size already reached?
-
- if(Queue -> MaxSize == Queue -> QueueSize)
- {
- struct SemaphoreRequest WaitRequest;
-
- // That's me
-
- WaitRequest . sr_Waiter = FindTask(NULL);
-
- // Add this task to the waiting list
-
- AddTail((struct List *)&Queue -> WaitList,(struct Node *)&WaitRequest);
-
- // Careful, please
-
- Forbid();
-
- // Drop the semaphore
-
- ReleaseSemaphore(&Queue -> Access);
-
- // Clear the one-shot flag
-
- ClrSignal(SIGF_SINGLE);
-
- // Wait for the queue to become smaller
-
- Wait(SIGF_SINGLE);
-
- // Gain access to the handle
-
- ObtainSemaphore(&Queue -> Access);
-
- // Reenable multitasking
-
- Permit();
- }
-
- // We are going to make the queue longer
-
- Queue -> QueueSize++;
- }
-
- // Add the item to the list
-
- AddTail((struct List *)&Queue -> MsgList,(struct Node *)Item);
-
- // Wake up the owner
-
- Signal(Queue -> SigTask,Queue -> SigMask);
-
- // Drop the semaphore
-
- ReleaseSemaphore(&Queue -> Access);
- }
- }
- }
-
- /* DeleteMsgItem(struct MsgItem *Item):
- *
- * Clean up after a message item.
- */
-
- VOID __regargs
- DeleteMsgItem(struct MsgItem *Item)
- {
- // Valid data?
-
- if(Item)
- {
- // Do we have a destructor for this item?
-
- if(Item -> Destructor)
- (*Item -> Destructor)(Item);
- }
- }
-
- /* CreateMsgItem(LONG Size):
- *
- * Allocate a new message item.
- */
-
- struct MsgItem * __regargs
- CreateMsgItem(LONG Size)
- {
- struct MsgItem *Item;
-
- // Allocate space for the item and add the default destructor
-
- if(Item = (struct MsgItem *)AllocVecPooled(Size,MEMF_ANY))
- Item -> Destructor = DestroyPooled;
-
- return(Item);
- }
-
- /* CreateMinMsgItem(LONG Size):
- *
- * Allocate a new message item, make sure that the header
- * is present.
- */
-
- struct MsgItem * __regargs
- CreateMinMsgItem(LONG Size)
- {
- struct MsgItem *Item;
-
- // Allocate space for the item and add the default destructor
-
- if(Item = (struct MsgItem *)AllocVecPooled(sizeof(struct MsgItem) + Size,MEMF_ANY))
- Item -> Destructor = DestroyPooled;
-
- return(Item);
- }
-
- /* UnlockMsgQueue(struct MsgQueue *Queue):
- *
- * Wake up all tasks waiting for the queue to become
- * smaller. Note: this will change the maximum queue
- * size.
- */
-
- VOID __regargs
- UnlockMsgQueue(struct MsgQueue *Queue)
- {
- // Valid data?
-
- if(Queue)
- {
- struct SemaphoreRequest *WaitRequest;
-
- // Gain access to the handle
-
- ObtainSemaphore(&Queue -> Access);
-
- // Notify all waiting tasks
-
- for(WaitRequest = (struct SemaphoreRequest *)Queue -> WaitList . mlh_Head ; WaitRequest -> sr_Link . mln_Succ ; WaitRequest = (struct SemaphoreRequest *)WaitRequest -> sr_Link . mln_Succ)
- Signal(WaitRequest -> sr_Waiter,SIGF_SINGLE);
-
- // Reset the maximum queue size
-
- Queue -> MaxSize = 0;
-
- // Drop the semaphore
-
- ReleaseSemaphore(&Queue -> Access);
- }
- }
-
- /* DeleteMsgQueue(struct MsgQueue *Queue):
- *
- * Free a queue handle.
- */
-
- VOID __regargs
- DeleteMsgQueue(struct MsgQueue *Queue)
- {
- // Valid data?
-
- if(Queue)
- {
- struct MsgItem *Item,*Next;
-
- // Make sure no tasks is waiting in the list
-
- UnlockMsgQueue(Queue);
-
- // Disable multitasking
-
- Forbid();
-
- // Clear the queue signal
-
- ClrSignal(Queue -> SigMask);
-
- // Wait until all tasks are finished
-
- while(Queue -> QueueSize)
- Wait(Queue -> SigMask);
-
- // Reenable multitasking
-
- Permit();
-
- // Remove each item from the list
-
- for(Item = (struct MsgItem *)Queue -> MsgList . mlh_Head ; Next = (struct MsgItem *)Item -> Link . mln_Succ ; Item = Next)
- DeleteMsgItem(Item);
-
- // Free the signal bit if necessary
-
- if(Queue -> SigBit != -1)
- FreeSignal(Queue -> SigBit);
-
- // Free the handle
-
- FreeVecPooled(Queue);
- }
- }
-
- /* CreateMsgQueue(ULONG SigMask,LONG MaxSize):
- *
- * Allocate a queue handle.
- */
-
- struct MsgQueue * __regargs
- CreateMsgQueue(ULONG SigMask,LONG MaxSize)
- {
- struct MsgQueue *Queue;
-
- // Allocate the queue handle
-
- if(Queue = (struct MsgQueue *)AllocVecPooled(sizeof(struct MsgQueue),MEMF_ANY | MEMF_PUBLIC | MEMF_CLEAR))
- {
- // Initialize the access semaphore
-
- InitSemaphore(&Queue -> Access);
-
- // Initialize the message item list
-
- NewList((struct List *)&Queue -> MsgList);
-
- // Reset the size data
-
- Queue -> QueueSize = 0;
- Queue -> MaxSize = MaxSize;
-
- // Initialize the queue wait list
-
- NewList((struct List *)&Queue -> WaitList);
-
- // Store the owner address
-
- Queue -> SigTask = FindTask(NULL);
-
- // Do we have a signal mask ready?
-
- if(SigMask)
- {
- // Use it
-
- Queue -> SigMask = SigMask;
- Queue -> SigBit = -1;
- }
- else
- {
- // Allocate a new signal
-
- if((Queue -> SigBit = AllocSignal(-1)) == -1)
- {
- FreeVecPooled(Queue);
-
- return(NULL);
- }
- else
- Queue -> SigMask = (1L << Queue -> SigBit);
- }
- }
-
- return(Queue);
- }
-
- /* SetQueueDiscard(struct MsgQueue *Queue,BOOL Mode):
- *
- * Set whether new items should be added to the
- * queue or whether they should rather get
- * discarded instead.
- */
-
- VOID __regargs
- SetQueueDiscard(struct MsgQueue *Queue,BOOL Mode)
- {
- if(Queue)
- {
- ObtainSemaphore(&Queue -> Access);
-
- Queue -> Discard = Mode;
-
- ReleaseSemaphore(&Queue -> Access);
- }
- }
-